Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the DynamicFileCatalog in datafusion-catalog #11035

Merged
merged 60 commits into from
Sep 9, 2024

Conversation

goldmedal
Copy link
Contributor

@goldmedal goldmedal commented Jun 20, 2024

Which issue does this PR close?

Closes #10986 .

Rationale for this change

Follow the idea of #4838 to implement DynamicFileCatalog in datafusion-catalog. Users can enable this feature for an existing SessionContext by SessoinContext::enable_url_table.

let ctx = SessionContext::new().enable_url_table();
ctx.sql("SELECT * FROM 'tests/data/example.csv' as example").await?.show().await?;

What changes are included in this PR?

  • Implement SessionStore to access SessionState for he runtime table building.
  • In datafusion-cli, DynamicFileCatalog is renamed to DynamicObjectStoreCatalog because it's responsible for registering the required object store now.

Are these changes tested?

yes

Are there any user-facing changes?

  • add new API of SessoinContext: enable_url_table
  • add new feature: home_dir for the URL table.

@github-actions github-actions bot added the core Core DataFusion crate label Jun 20, 2024
@@ -305,10 +306,18 @@ impl SessionContext {

/// Creates a new `SessionContext` using the provided [`SessionState`]
pub fn new_with_state(state: SessionState) -> Self {
let state_ref = Arc::new(RwLock::new(state.clone()));
state
.schema_for_ref("datafusion.public.xx")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just a workaround to get DynamicFileSchemaProvider. I'll refactor it later.

Comment on lines 53 to 62
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return Ok(inner_table);
}
let optimized_url = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_url.as_str())?;
let state = &self
.state_store
.get_state()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaProvider::table won't have SessionState by default. We should get it from StateStore.

let df = ctx
.sql(
format!(
r#"SELECT column_1, MIN(column_12), MAX(column_12)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column is c1 actually. There're some issues about getting CSV header automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I found that this is the default behavior for ListTable. The dynamic query in datafusion-cli is the same as this. I think we don't need to change it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is controlled by the config option in the current session context. We can create the session context like

let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
let ctx = SessionContext::new_with_config(cfg);

to enable the header scanning.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 21, 2024
@@ -19,7 +19,7 @@

# Make a table with multiple input partitions
statement ok
CREATE TABLE data AS
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there're a file or directory called data in the working directory. However, I think it's really weird. IMO, we should only match a string literal like select * from '/xxx/aaa/data.csv' but the normal identifier of a table also be matched.
I'll do more research for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I did an experimental for DataFusion v39.0.0 like

(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ mkdir datas 
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ cp aggregate_test_100.csv datas 
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ datafusion-cli                 
DataFusion CLI v39.0.0
> select * from datas;
+----------+----------+----------+----------+-------------+----------------------+----------+----------+------------+----------------------+-------------+---------------------+--------------------------------+
| column_1 | column_2 | column_3 | column_4 | column_5    | column_6             | column_7 | column_8 | column_9   | column_10            | column_11   | column_12           | column_13                      |
+----------+----------+----------+----------+-------------+----------------------+----------+----------+------------+----------------------+-------------+---------------------+--------------------------------+

I created a folder datas and place a csv in this, then start the cli in this path. I can directly query this folder without single-quote string literal.

For a single file, it doesn't work

(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ cp aggregate_test_100.csv data
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ datafusion-cli                
DataFusion CLI v39.0.0
> select * from data;
Error during planning: table 'datafusion.public.data' not found

I think it should be an issue to match only the string literal as the dynamic file table name.

@@ -64,6 +72,13 @@ SELECT * FROM arrow_partitioned ORDER BY f0;
3 baz true 456
4 NULL NULL 456

# dynamic select arrow file in the folder
query ITB
SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic query doesn't support the partitioned folder. It could be an enhancement issue for it.

@goldmedal goldmedal changed the title [DRAFT] Implement DynamicFileSchemaProvider in the core Implement DynamicFileSchemaProvider in the core Jun 22, 2024
@goldmedal goldmedal marked this pull request as ready for review June 22, 2024 05:35
@alamb
Copy link
Contributor

alamb commented Jun 22, 2024

Thanks @goldmedal -- I hope to review this PR this wekeend, likely tomorrow

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

I am sorry for the delay -- I plan to review this tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you so much @goldmedal -- this is really cool

I have a few thoughts

Security implications

I worry about its security implications (this will now allow anything built on DataFusion 's SessionContext to read arbitrary local files even if https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SQLOptions.html#method.with_allow_dml is disabled

I think we should therefore not register this provider with the SessionContext by default and instead add some examples showing how to register it by itself

Structure / dependencies

As the code is currently structured (not related to this PR) it seems like this has to go into the core crate. It seems to me it would be better if we could find some way to start breaking up the core (e.g. remove the catalog providers, etc)

I am still thinking about this

datafusion-examples/examples/csv_sql.rs Outdated Show resolved Hide resolved
datafusion/sqllogictest/test_files/describe.slt Outdated Show resolved Hide resolved
datafusion/core/src/catalog/dynamic_file_schema.rs Outdated Show resolved Hide resolved
datafusion/core/Cargo.toml Outdated Show resolved Hide resolved
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Aug 20, 2024
@goldmedal goldmedal requested a review from alamb August 26, 2024 14:14
@alamb
Copy link
Contributor

alamb commented Aug 26, 2024

Thanks @goldmedal -- I'll try and review this shortly

@goldmedal goldmedal force-pushed the feature/10986-dynamic-table-provider branch from 7fef748 to f7b4b8c Compare September 4, 2024 14:49
@@ -584,9 +584,9 @@ dependencies = [

[[package]]
name = "aws-sdk-sts"
version = "1.41.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we execute cargo update, the version will be updated to 1.41.0 which isn't compatible with Rust 1.76, then the msrv check will fail. I'm not sure if there is a better way to limit the version in the cargo toml file to avoid the update command updates an invalid version 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have actually worked around this on #12032 (comment)

@alamb
Copy link
Contributor

alamb commented Sep 5, 2024

I will review this PR over the next day or two

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR is I think this PR is really nice @goldmedal -- the code is well structured and commented, and I think this will be a very useful feature for users of DataFusion

What I would like to do is to try and create an example in datafusion-examples of a program that run queries from arbitrary URLs. SELECT * from 'my-s3://foo.com/data'

I'll try and work on that today or tomorrow -- I suspect in trying to make that example it will become apparent what, if any, gaps remain in the APIs in terms of registering ObjectStorees etc.

Comment on lines +66 to +73
// dynamic query by the file path
ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;

// print the results
df.show().await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @alamb. I added a simple s3 example here. I hope it is what you want or that it could inspire you for a new example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No actually this is perfect -- thank you @goldmedal

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @goldmedal

I plan to leave this PR open until Monday so anyone else who is interested can take a look at it prior to merge.

I left some suggestions on how to potentially improve the comments / documentation, but I think we (I can do it) as a follow on PR too

#[derive(Default)]
pub struct DynamicListTableFactory {
/// The session store that contains the current session.
session_store: Arc<SessionStore>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW since the SessionStore has an Arc inside it, this extra level of Arc may not be necessary

/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @korowa changed the default so that has_header is true by default now so this line is unecessary

Comment on lines 360 to 362
/// Enable the dynamic file query for the current session.
/// See [DynamicFileCatalog] for more details
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Enable the dynamic file query for the current session.
/// See [DynamicFileCatalog] for more details
///
/// Enable dynamic file querying for the current session.
///
/// This allows queries to directly access arbitrary file names via SQL like
/// `SELECT * from 'my_file.parquet'`
/// so it should only be enabled for systems that such access is not a security risk
///
/// See [DynamicFileCatalog] for more details
///

let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let path = path.join("tests/tpch-csv/customer.csv");
let url = format!("file://{}", path.display());
let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here it isn't necessary to set the has_header flag anymore I don't think

@@ -43,6 +43,10 @@ SELECT * FROM arrow_simple
3 baz false
4 NULL true

# url table is only supported by DynamicFileCatalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding context here about why this is an important test would make it easier to understand for future readers

Suggested change
# url table is only supported by DynamicFileCatalog
# Ensure that local files can not be read by default (a potential security issue)
# (url table is only supported when DynamicFileCatalog is enabled)

~id9~ ~value9~

query TTT
DESCRIBE '../core/tests/data/aggregate_simple.csv';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 18 to 19
//! dynamic_file contains [`DynamicFileCatalog`] that creates tables from file paths
//! if the wrapped [`CatalogProviderList`] doesn't have the table provider.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest making module level documentation (which is rendered typically as a single line) point to the main struct/trati

Suggested change
//! dynamic_file contains [`DynamicFileCatalog`] that creates tables from file paths
//! if the wrapped [`CatalogProviderList`] doesn't have the table provider.
//! [`DynamicFileCatalog`] that creates tables from file paths

Comment on lines 114 to 117
/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path.
///
/// The provider will try to create a table provider from the file path if the table provider
/// isn't exist in the inner schema provider. The required object store must be registered in the session context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reference to object_store here -- it is up to the UrlTableFactory to handle resolving urls to TableProviders, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for example, how DynamicListTableFactory resolves the URL depends on what the session context registers as the object store.
Indeed, it's a legacy comment (before moving to datafusion-catalog). We don't need to mention the object store here. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the part of the object store here. Thanks for mentioning it.

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#
# Note: This file runs with a SessionContext that has the `enable_url_table` flag set
#

Comment on lines +1846 to +1855
let session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(cfg)
.build();
let ctx = SessionContext::new_with_state(session_state).enable_url_table();
let result = plan_and_collect(
&ctx,
format!("select c_name from '{}' limit 3;", &url).as_str(),
)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be possible here to add enable_url_table as a argument to SessionStateBuilder

So it could look like

let session_state =  SessionStateBuilder::new()
            .with_default_features()
            .with_config(cfg)
            .enable_url_table()
            .build();
let ctx = SessionContext::from(session_state);

🤔

(we can do this as a follow on as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I think it's a good idea. We can have a flag in the builder. If it's true, we can wrap the catalog list implicitly. It may need some additional implementation and test cases. I prefer to do it in the follow-up PR.

@goldmedal
Copy link
Contributor Author

I plan to leave this PR open until Monday so anyone else who is interested can take a look at it prior to merge.

👍

I left some suggestions on how to potentially improve the comments / documentation, but I think we (I can do it) as a follow on PR too

Thanks @alamb I try to address the suggestions here. Only about the comment related to SessionStateBuilder, I prefer to do it in another PR.

@alamb alamb merged commit 9bc39a0 into apache:main Sep 9, 2024
26 checks passed
@goldmedal goldmedal deleted the feature/10986-dynamic-table-provider branch September 9, 2024 14:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement DynamicTableProvider in DataFusion Core
3 participants